package com.example.demo05redisson.delayqueue;

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBucket;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Optional;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class RedisDelayQueueUtil {

    @Autowired
    private RedissonClient redissonClient;

    // 前缀用于生成可取消任务的Redis键
    private static final String CANCELLABLE_TASK_PREFIX = "cancellable_task_";

    /**
     * 添加延迟队列
     *
     * @param value     队列值
     * @param delay     延迟时间
     * @param timeUnit  时间单位
     * @param queueCode 队列键
     * @param <T>       队列值的类型
     * @return 返回任务ID，用于取消任务
     */
    public <T> void add(RedisDelayQueueValue<T> value, long delay, TimeUnit timeUnit, String queueCode) {
        String taskId = value.getId();
        String cancellableTaskKey = CANCELLABLE_TASK_PREFIX + taskId;
        RBucket<T> cancellableTaskBucket = redissonClient.getBucket(cancellableTaskKey);
        try {
            RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(queueCode);
            RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
            cancellableTaskBucket.set(value.getValue());
            delayedQueue.offer(taskId, delay, timeUnit);
            log.info("添加延时队列成功，队列键：{}，任务ID：{}，延迟时间：{}秒", queueCode, taskId, timeUnit.toSeconds(delay));
        } catch (Exception e) {
            // 如果添加失败，删除可能已创建的可取消任务键
            cancellableTaskBucket.delete();
            log.error("添加延时队列失败", e);
            throw new RuntimeException("添加延时队列失败", e);
        }
    }

    /**
     * 获取延迟队列
     *
     * @param queueCode 队列键
     * @param <T>       队列值的类型
     * @return 队列值
     */
    public <T> T get(String queueCode) {
        RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(queueCode);
        String taskId = blockingDeque.poll();
        if (taskId != null) {
            // 如果获取到了任务ID，从可取消任务标识中获取任务值，并删除该标识
            RBucket<T> bucket = redissonClient.getBucket(CANCELLABLE_TASK_PREFIX + taskId);
            T value = bucket.get();
            bucket.delete();
            return value;
        }
        return null;
    }

    /**
     * 取消延迟队列中的任务（如果任务尚未执行）
     *
     * @param taskId 任务ID
     * @return 如果任务成功取消则返回true，否则返回false
     */
    public <T> boolean cancel(String taskId) {
        // 尝试删除可取消任务标识，如果成功则说明任务还没有被执行
        RBucket<T> bucket = redissonClient.getBucket(CANCELLABLE_TASK_PREFIX + taskId);
        T t = bucket.get();
        Optional.ofNullable(t).orElseThrow(() -> new RuntimeException("取消延时队列失败"));
        log.info("取消延时队列成功，{}", t);
        return bucket.delete();
    }
}

